Skip to content

Support co-partitioned range inner equi joins#23184

Open
gene-bordegaray wants to merge 2 commits into
apache:mainfrom
gene-bordegaray:gene.bordegaray/2026/06/range-partitioned-joins
Open

Support co-partitioned range inner equi joins#23184
gene-bordegaray wants to merge 2 commits into
apache:mainfrom
gene-bordegaray:gene.bordegaray/2026/06/range-partitioned-joins

Conversation

@gene-bordegaray

@gene-bordegaray gene-bordegaray commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

DataFusion can represent source-declared range partitioning, but partitioned hash joins still required hash partitioned inputs. So an inner join on compatible range-partitioned keys would insert unnecessary hash repartitions, even when each left/right partition already covered the same key domain.

This PR adds a partitioning requirement that means "equal key values are co-located" . I was calling this "compatibility" but found we can satisfy the requirement with looser conditions. Other systems call this "co-location" or "co-partitioning" (trino, spark). Which they (and now I am proposing) define as when both sides of a join are already partitioned so matching key values appear in corresponding partitions, so we can join partition pairs directly without repartitioning the sides.

This lets "co-partitioned" range inputs satisfy inner partitioned hash joins. This will also be applicable to other join types and operators but kept the first PR thin to keep scope more reviewable.

What changes are included in this PR?

  • Adds Distribution::KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>) as a public distribution requirement.

    • HashPartitioned([a]) means rows must be partitioned by hash on a.
    • KeyPartitioned([a]) means rows with equal a values must be co-located, but the partitioning algorithm may be hash, range, or another compatible scheme.
    • Example:
      Hash([left.a], 3) satisfies KeyPartitioned([left.a])
      Range([right.b ASC], [(10), (20)], 3) satisfies KeyPartitioned([right.b])
      
  • Adds Partitioning::co_partitioned_with(...) to validate that two independently satisfying partitionings also can be paired by partition index.

    • Examples:
      • Accepted: both sides satisfy their own key requirement and have matching range boundaries.
        left:  Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
        right: Range([b ASC], [(10), (20)], 3), required KeyPartitioned([b])
        
      • Accepted: both sides satisfy their own key requirement and have matching hash partition counts.
        left:  Hash([a], 3), required KeyPartitioned([a])
        right: Hash([b], 3), required KeyPartitioned([b])
        
      • Rejected: both sides satisfy their own key requirement, but range boundaries differ.
        left:  Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
        right: Range([b ASC], [(15), (20)], 3), required KeyPartitioned([b])
        
      • Rejected: both sides satisfy their own key requirement, but partition counts differ.
        left:  Hash([a], 3), required KeyPartitioned([a])
        right: Hash([b], 4), required KeyPartitioned([b])
        
  • Changes inner partitioned HashJoinExec requirements from HashPartitioned to KeyPartitioned.

    • All other hash joins still require HashPartitioned for now.
  • Updates EnforceDistribution so co-partitioned range inner joins avoid repartitioning.

    • Examples:
      • Compatible range partitioning: no repartition is inserted because partitions can be joined by index.
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
          DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
          DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
        
      • Incompatible range boundaries: both sides are repartitioned by hash because partition i does not represent the same key domain on both sides.
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
          RepartitionExec: partitioning=Hash([a], target_partitions)
            DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
          RepartitionExec: partitioning=Hash([b], target_partitions)
            DataSourceExec: output_partitioning=Range([b ASC], [(15), (20)], 3)
        
      • Mismatched hash partition counts: both sides are forced to the target hash partition count so partition indexes line up.
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
          RepartitionExec: partitioning=Hash([a], target_partitions)
            DataSourceExec: output_partitioning=Hash([a], 11)
          RepartitionExec: partitioning=Hash([b], target_partitions)
            DataSourceExec: output_partitioning=Hash([b], 12)
        
      • Non-inner joins: range inputs still get hash repartitioning because only inner partitioned hash joins use KeyPartitioned in this PR.
        HashJoinExec: mode=Partitioned, join_type=Left, on=[(a, b)]
          RepartitionExec: partitioning=Hash([a], target_partitions)
            DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
          RepartitionExec: partitioning=Hash([b], target_partitions)
            DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
        
  • Keeps partitioned dynamic filter pushdown restricted to hash-compatible routing.

    • Compatible range partitioning can satisfy the join, but dynamic filters still route by hash, so range/range partitioned joins disable dynamic filters.
  • Degrades range join output partitioning to UnknownPartitioning(n) rather than erroring. Adding this behavior would need more tests and careful thought about, I think its safert o just degrade for first PR.

Are these changes tested?

Yes.

  • KeyPartitioned satisfaction for hash and range partitioning.
  • co_partitioned_with for compatible and incompatible range/hash partitioning.
  • EnforceDistribution behavior for:
    • compatible range joins avoiding hash repartitioning
    • incompatible range bounds rehashing
    • mismatched hash partition counts rehashing
    • non-inner range joins rehashing
  • sanity checking for invalid partitioned hash joins.
  • dynamic filter rejection for range partitioning, preserved file partitions, and mismatched hash counts.
  • sqllogictest coverage for range-partitioned joins avoiding hash repartitioning and non-range joins still repartitioning.

Are there any user-facing changes?

Yes.

This PR changes public physical planning APIs:

  • Adds Distribution::KeyPartitioned.
  • Adds Partitioning::co_partitioned_with.
    • NOTE: This replaces the previous partition compatibility API with the new co-partitioning API. Since the compatibility API was never in a release I believe this is ok to do (lesson learned to not make API change until ew have definitive consumer).
  • Affects users matching exhaustively on Distribution.

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Jun 25, 2026
@gene-bordegaray gene-bordegaray changed the title Support co-partitioned range hash joins [WIP] Support co-partitioned range hash joins Jun 25, 2026
@github-actions

github-actions Bot commented Jun 25, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion v54.0.0 (current)
       Built [ 110.879s] (current)
     Parsing datafusion v54.0.0 (current)
      Parsed [   0.034s] (current)
    Building datafusion v54.0.0 (baseline)
       Built [ 111.684s] (baseline)
     Parsing datafusion v54.0.0 (baseline)
      Parsed [   0.036s] (baseline)
    Checking datafusion v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.613s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 224.715s] datafusion
    Building datafusion-physical-expr v54.0.0 (current)
       Built [  31.088s] (current)
     Parsing datafusion-physical-expr v54.0.0 (current)
      Parsed [   0.049s] (current)
    Building datafusion-physical-expr v54.0.0 (baseline)
       Built [  29.353s] (baseline)
     Parsing datafusion-physical-expr v54.0.0 (baseline)
      Parsed [   0.050s] (baseline)
    Checking datafusion-physical-expr v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.351s] 223 checks: 221 pass, 2 fail, 0 warn, 30 skip

--- failure enum_variant_added: enum variant added on exhaustive enum ---

Description:
A publicly-visible enum without #[non_exhaustive] has a new variant.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#enum-variant-new
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/enum_variant_added.ron

Failed in:
  variant Distribution:KeyPartitioned in /home/runner/work/datafusion/datafusion/datafusion/physical-expr/src/partitioning.rs:657

--- failure inherent_method_missing: pub method removed or renamed ---

Description:
A publicly-visible method or associated fn is no longer available under its prior name. It may have been renamed or removed entirely.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#item-remove
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/inherent_method_missing.ron

Failed in:
  RangePartitioning::compatible_with, previously in file /home/runner/work/datafusion/datafusion/target/semver-checks/git-apache_main/0d246001df27742061abe27be3b48b639f0f1259/datafusion/physical-expr/src/partitioning.rs:253
  Partitioning::compatible_with, previously in file /home/runner/work/datafusion/datafusion/target/semver-checks/git-apache_main/0d246001df27742061abe27be3b48b639f0f1259/datafusion/physical-expr/src/partitioning.rs:416

     Summary semver requires new major version: 2 major and 0 minor checks failed
    Finished [  61.851s] datafusion-physical-expr
    Building datafusion-physical-optimizer v54.0.0 (current)
       Built [  38.555s] (current)
     Parsing datafusion-physical-optimizer v54.0.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-physical-optimizer v54.0.0 (baseline)
       Built [  38.544s] (baseline)
     Parsing datafusion-physical-optimizer v54.0.0 (baseline)
      Parsed [   0.023s] (baseline)
    Checking datafusion-physical-optimizer v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.125s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  78.268s] datafusion-physical-optimizer
    Building datafusion-physical-plan v54.0.0 (current)
       Built [  36.064s] (current)
     Parsing datafusion-physical-plan v54.0.0 (current)
      Parsed [   0.130s] (current)
    Building datafusion-physical-plan v54.0.0 (baseline)
       Built [  36.732s] (baseline)
     Parsing datafusion-physical-plan v54.0.0 (baseline)
      Parsed [   0.137s] (baseline)
    Checking datafusion-physical-plan v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.646s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  74.774s] datafusion-physical-plan
    Building datafusion-sqllogictest v54.0.0 (current)
       Built [ 178.529s] (current)
     Parsing datafusion-sqllogictest v54.0.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-sqllogictest v54.0.0 (baseline)
       Built [ 179.500s] (baseline)
     Parsing datafusion-sqllogictest v54.0.0 (baseline)
      Parsed [   0.022s] (baseline)
    Checking datafusion-sqllogictest v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.088s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 361.863s] datafusion-sqllogictest

@github-actions github-actions Bot added the auto detected api change Auto detected API change label Jun 25, 2026
/// This is optimizer policy: partitioned joins require children that can be
/// paired by partition index. Inner hash joins can reuse compatible range
/// partitioning; otherwise the existing hash repartitioning policy applies.
fn partitioned_join_distribution(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that simple checking logic is slightly duplicated here, here, and here

There may be a good way to extract this out but didnt want to premptively do a public change on speculation but something I am noting. Let me know if any one has suggestsion

/// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
/// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
/// ```
pub fn co_partitioned_with(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized the concept was a bit off. I think this is ok if we haven't had a release... 😅

I linked the mirroring concepts that are used in trino in spark in the PR description

Lesson learned to have consumer of the public API first

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I don't think it's a big deal, this is anyways not the typical method external consumers rely one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without taking a look at the usages of this method, I do notice that the signature seems a bit off: why would it need the required Distribution for checking if two Partitionings are co-partitioned? it seems unrelated.

I'll keep reading though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I thought similarly but this matters for multi-children operators. For joins, each side has a different may have a diff schema:

left:  Partitioning::Range(left.a)
right: Partitioning::Range(right.x)
join:  left.a = right.x

So the expressions left.a and right.x are not equal since they are from different children. Because of this we need to check each sides req Distribution to know what each partitioning has to satisfy:

left requirement:  key partitioned on left.a
right requirement: key partitioned on right.x

So we need to check:

  1. left partitioning satisfies left requirement
  2. right partitioning satisfies right requirement
  3. the two partition mappings can be paired

I could do something like this:

struct RequiredPartitioning<'a> {
      partitioning: &'a Partitioning,
      requirement: &'a Distribution,
      eq_properties: &'a EquivalenceProperties,
  }

  impl RequiredPartitioning<'_> {
      fn co_partitioned_with(&self, other:
      &RequiredPartitioning<'_>) -> bool
  }

or derive the EquivalenceProperties in the method if we pass the children directly

@gene-bordegaray

Copy link
Copy Markdown
Contributor Author

cc: @gabotechs @stuhood

@gene-bordegaray gene-bordegaray changed the title [WIP] Support co-partitioned range hash joins Support co-partitioned range hash joins Jun 25, 2026
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/06/range-partitioned-joins branch from 8583303 to 45d598b Compare June 25, 2026 12:59
@gene-bordegaray gene-bordegaray marked this pull request as ready for review June 25, 2026 13:04
# TEST 3b: Non-Range Join Repartitions
# The source Range partitioning does not satisfy a join on non_range_key, so
# planning still inserts Hash repartitioning.
##########

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can propbably eliminate this, almost a sanity check / nice to see after teh positive case

@stuhood

stuhood commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Amazing timing. Will get you some feedback on this by early next week! Thank you!

@gene-bordegaray

gene-bordegaray commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Amazing timing. Will get you some feedback on this by early next week!

@stuhood great, thank you! I also have a huge line of follow up issues to support more join types that we can split up 👍

I am trying to make smaller tickets as more of the plumbing gets in to get more people involved

@gene-bordegaray gene-bordegaray changed the title Support co-partitioned range hash joins Support co-partitioned range inner equi joins Jun 25, 2026
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/06/range-partitioned-joins branch from 45d598b to 2fca2bb Compare June 26, 2026 15:21
@gene-bordegaray

gene-bordegaray commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

I have added a stacked PR that shows how I imagine Aggregations will consume the KeyPartitioned API: gene-bordegaray#6 (not done yet 😄)

@gabotechs gabotechs left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good! left a first round of comments.

The main one is about trying to collapse Distribution::KeyPartitioned and Distribution::HashPartitioned into just a single Distribution variant, I do see that in pretty much all places the code that handles both is the same, and I get the feeling that Distribution::HashPartitioned is just an unfortunate name that was choosen a while back, but it's not really coupled to hashes.

Comment on lines +645 to 658
/// Requires rows with equal values for the given keys to be colocated in
/// the same partition, without requiring a specific partitioning algorithm.
///
/// Unlike [`Self::HashPartitioned`], this can be satisfied by non-hash
/// partitioning such as range partitioning. A partitioning on a subset of
/// these keys can also satisfy this requirement because rows equal on all
/// required keys are also equal on any subset.
///
/// For multi-input operators, satisfaction alone is not enough: each input
/// may satisfy its own key requirement while using incompatible partition
/// boundaries. Use [`Partitioning::co_partitioned_with`] before pairing
/// partitions by index.
KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there's a way of condense the two HashPartitioned and KeyPartitioned into just one. Take a look at the Distribution::HashPartitioned docs:

Requires children to be distributed in such a way that the same values of the keys end up in the same partition

For meeting that distribution, I don't think it's really necessary that children are partitioned specifically with a hashin algorithm, they can be partitioned in any way that satisfies that invariant (same value of the keys end up in the same partition).

I'm yet to read the full PR, but I'm getting the feeling that the previous HashPartitioned naming is just unfortunate, and could probably have been just named KeyPartitioned from the beginning.

What's your opinion on trying to rename HashPartitioned to KeyPartitioned instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya I agree with this long-term 🤔

What do you think about deprecating HashPartitioned then Introducing KeyPartitioned and lettting this PR be the first real consumer of it. Then we don't allow range to satisfy KeyPartitioned generally here, and only allow range to satisfy the distribution in the partitioend join case.

Then in following PRs we slowly add some provate helpers to allow range to satisfay the KeyPartitioned like aggregations, then once its at a reasonable scope (a good amount of operators are working and tested thoroughly), we and range to the general satisfaction() public method.

Trying to think how to do this incrementally

/// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
/// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
/// ```
pub fn co_partitioned_with(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I don't think it's a big deal, this is anyways not the typical method external consumers rely one.

/// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
/// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
/// ```
pub fn co_partitioned_with(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without taking a look at the usages of this method, I do notice that the signature seems a bit off: why would it need the required Distribution for checking if two Partitionings are co-partitioned? it seems unrelated.

I'll keep reading though

}
equivalent_exprs(left_exprs, right_exprs, eq_properties)
}
(Partitioning::Hash(_, _), Partitioning::Hash(_, _)) => true,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Is this correct?

What if the left Partitioning::Hash contains 1 expression but the right Partitioning::Hash contains 6 completely different expressions? I'm not sure if returning true in that case is correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only correct check before:

   if !self
            .satisfaction(required, eq_properties, false)
            .is_satisfied()
            || !other
                .satisfaction(other_required, other_eq_properties, false)
                .is_satisfied()
        {
            return false;
        }

since this makes sure each side satisfies its join distribution. So Hash(a) vs Hash(b, c, ...) wouldnt reach this.

Something like this may up front be clenaer and more obvious though:

 let left_satisfies = matches!(
      self.satisfaction(required,
      eq_properties, false),
      PartitioningSatisfaction::Exact
  );

  let right_satisfies = matches!(
      other.satisfaction(other_required,
      other_eq_properties, false),
      PartitioningSatisfaction::Exact
  );

  if !left_satisfies || !right_satisfies {
      return false;
  }

Comment on lines 563 to 573
Distribution::HashPartitioned(required_exprs) => match self {
// Here we do not check the partition count for hash partitioning and assumes the partition count
// and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
// then we need to have the partition count and hash functions validation.
Partitioning::Hash(partition_exprs, _) => {
// Empty hash partitioning is invalid
if partition_exprs.is_empty() || required_exprs.is_empty() {
return PartitioningSatisfaction::NotSatisfied;
}

if equivalent_exprs(required_exprs, partition_exprs, eq_properties) {
return PartitioningSatisfaction::Exact;
}

let eq_groups = eq_properties.eq_group();
if !eq_groups.is_empty() {
if allow_subset {
let normalized_partition_exprs =
normalize_exprs(partition_exprs, eq_properties);
let normalized_required_exprs =
normalize_exprs(required_exprs, eq_properties);
if Self::is_subset_partitioning(
&normalized_partition_exprs,
&normalized_required_exprs,
) {
return PartitioningSatisfaction::Subset;
}
}
} else if allow_subset
&& Self::is_subset_partitioning(partition_exprs, required_exprs)
{
return PartitioningSatisfaction::Subset;
}

PartitioningSatisfaction::NotSatisfied
}
Partitioning::Hash(partition_exprs, _) => Self::key_expr_satisfaction(
partition_exprs,
required_exprs,
eq_properties,
allow_subset,
),
Partitioning::RoundRobinBatch(_)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that satisfying both Distribution::KeyPartitioned and Distribution::HashPartitioned with Partitioning::Hash requires exactly the same code reinforces my comment above. The longer I look the more I think we should collapse Distribution::HashPartitioned and Distribution::KeyPartitioned into just 1

hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
allow_subset_satisfy_partitioning: bool,
force_to_target: bool,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬 if-driven-development alert

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll keep reading to see if I can suggest something better

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it's not that bad, but at this point I don't think the add_hash_on_top is working well as an abstraction.

The output I would expect from this function is that it adds a hash repartition on top, but it might not do that, it depends on some conditional logic. There's some relatively good docs that make up for it, but rather than documenting the weirdness, it might be worth not having it at all from the beginning.

If you ask me, I'd just remove this function and inline the body in the only place where it's used:

Code suggestion
                Distribution::HashPartitioned(exprs)
                | Distribution::KeyPartitioned(exprs) => 'curr_match_branch: {
                    // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
                    // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
                    if !hash_necessary {
                        break 'curr_match_branch;
                    }

                    // Early return if hash repartition is unnecessary
                    // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
                    if target_partitions == 1 && current_partitions == 1 {
                        break 'curr_match_branch;
                    }

                    let is_satisfied = child
                        .plan
                        .output_partitioning()
                        .satisfaction(
                            &Distribution::HashPartitioned(exprs.to_vec()),
                            child.plan.equivalence_properties(),
                            allow_subset_satisfy_partitioning,
                        )
                        .is_satisfied();
                    // Add hash repartitioning when:
                    // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied
                    // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism
                    let needs_repartition = if force_hash_to_target {
                        !is_satisfied || target_partitions != current_partitions
                    } else if allow_subset_satisfy_partitioning {
                        !is_satisfied
                    } else {
                        !is_satisfied || target_partitions > current_partitions
                    };
                    if needs_repartition {
                        let part = Partitioning::Hash(exprs.to_vec(), target_partitions);
                        child.plan = Arc::new(
                            RepartitionExec::try_new(child.plan, part)?
                                .with_preserve_order(),
                        );
                    }
                }

Which allows trimming a fair amount of code (35 insertions(+), 81 deletions(-)) and is easier to track as readers do not need to jump back and forth

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nit though, what you have right now in this PR is also good, so only take the suggestion if you want.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I think you are probably right, I am going to play with this once we talk about the larger issues first and see what fits nicely 👍

@2010YOUY01

Copy link
Copy Markdown
Contributor

I think KeyPartitioned is a really clever idea. I have some thoughts on simplifying the implementation. This is basically the same point as @gabotechs, but expressed slightly differently.

Key idea

KeyPartitioned(expr) is the logical generalization of HashPartitioned(expr): it means that rows with equal values for expr are guaranteed to be in the same partition.

Hash partitioning and range partitioning are two concrete ways to satisfy this requirement.

More precisely:

A plan is KeyPartitioned(k) iff:

for any two rows r1 and r2 in the same input:
    if r1.k = r2.k
    then partition(r1) = partition(r2)

Equivalently, different output partitions contain disjoint sets of key values.

This seems to cover the logical requirement that HashPartitioned(expr) is trying to express today, and it also covers the non-overlapping range partitioning cases. Unifying them should make the implementation simpler.

Example 1: aggregation has one input

select k, avg(v)
from t1
group by k

AggregateExec requires its input to be partitioned by the group keys. Conceptually, that requirement is KeyPartitioned(k).

That means all rows with the same k value must be in the same input partition, so each partition can compute its local groups independently.

For example, both of the following satisfy this requirement:

  • hash partitioned by k
  • range partitioned by k, assuming the ranges are non-overlapping

In either case, we do not need to insert an extra hash repartition before the aggregation.

Example 2: hash join has two inputs

select *
from t1
join t2
on t1.v1 = t2.v1

For a partitioned hash join, it is not enough for each side to be independently KeyPartitioned.

The two sides must also be co-partitioned with respect to the join keys. That means equal join-key values from the left and right inputs must be assigned to the same partition id.

The physical execution pattern is:

Union of:

join(t1_partition_0, t2_partition_0)
join(t1_partition_1, t2_partition_1)
join(t1_partition_2, t2_partition_2)
...

So the required co-partitioning rule is:

left and right are co-partitioned on left.a = right.b iff:

for any left row l and right row r:
    if l.a = r.b
    then partition(l) = partition(r)

This implies some extra compatibility requirements across the two inputs:

  • If both sides are hash partitioned, they must use compatible hash semantics: the same hash function, seed/salt, null handling, and modulo / partition count.
  • If both sides are range partitioned, they must use compatible range boundaries. The simplest case is the same partition count and the same split points. In the future, we may be able to relax this to support more general compatible range layouts.

So I think there are two related concepts:

KeyPartitioned(k):
    a per-input property:
    equal keys within this input go to the same partition

CoPartitioned(left.k, right.k):
    a cross-input property:
    equal join keys across both inputs go to the same partition id

Implementation plan

Here is my initial thoughts on a implementation plan (note I haven't read the related code carefully, so it's just some rough ideas)

  1. Collapse Distribution::HashPartitioned into Distribution::KeyPartitioned conceptually, or rename HashPartitioned to KeyPartitioned, and update the docs to describe the more general semantics.
  2. Update and refine the existing test coverage for hash partitioning under the new semantics.
  3. Support aggregation first. Since aggregation only has one input, it only needs the unary KeyPartitioned property. If the input is already range partitioned by the group keys, we should not insert an extra hash repartition.
  4. Support hash join next. Since join has two inputs, we also need to validate the cross-input co-partitioning requirement described above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change core Core DataFusion crate optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow co-partitioned range inputs to satisfy inner partitioned hash joins

4 participants